mod echo;

use echo::*;
use gabriel2::*;

#[tokio::main]
async fn main() -> Result<(), EchoError> {
    let state = EchoState { counter: 0 };

    let echo_ref = ActorRef::new("echo", EchoActor {}, state, 100000).await?;

    println!("Sent Ping");
    echo_ref.send(EchoMessage::Ping).await?;
    println!("Sent Ping and ask response");
    let pong = echo_ref.ask(EchoMessage::Ping).await?;
    println!("Got {:?}", pong);

    _ = echo_ref.stop().await;
    Ok(())
}

#[cfg(test)]
mod tests {
    use crate::echo::{EchoActor, EchoError, EchoMessage, EchoResponse, EchoState};
    use futures::StreamExt;
    use gabriel2::balancer::LoadBalancer;
    use gabriel2::sink_stream::ActorSinkTrait;
    use gabriel2::sink_stream::{ActorSink, ActorSinkStreamTrait};
    use gabriel2::*;
    use std::sync::Arc;

    #[tokio::test]
    async fn test_remote() -> anyhow::Result<()> {
        use gabriel2::remote::*;
        // let _ = env_logger::Builder::from_env(env_logger::Env::new().default_filter_or("trace")).try_init();

        let state = EchoState { counter: 0 };

        let echo_ref = ActorRef::new("echo", crate::echo::EchoActor {}, state, 100000).await?;
        let echo_server = ActorServer::new("echo_server", "127.0.0.1", 9001, echo_ref).await?;
        let echo_client: Arc<
            ActorClient<EchoActor, EchoMessage, EchoState, EchoResponse, EchoError>,
        > = ActorClient::new("echo_client", "127.0.0.1", 9001).await?;

        println!("Sent Ping");
        echo_client.send(EchoMessage::Ping).await?;

        println!("Sent Ping and ask response");
        let pong = echo_client.ask(EchoMessage::Ping).await?;
        println!("Got {:?}", pong);

        _ = echo_client.stop().await;
        _ = echo_server.stop().await;
        Ok(())
    }

    #[tokio::test]
    async fn test_sink() -> anyhow::Result<()> {
        // let _ = env_logger::Builder::from_env(env_logger::Env::new().default_filter_or("trace")).try_init();

        let state = EchoState { counter: 0 };

        let echo_ref = ActorRef::new("echo", crate::echo::EchoActor {}, state, 100000).await?;
        let echo_sink = ActorSink::sink(echo_ref.clone());
        let message_stream = futures::stream::iter(vec![
            EchoMessage::Ping,
            EchoMessage::Ping,
            EchoMessage::Ping,
        ])
        .map(Ok);
        _ = message_stream.forward(echo_sink).await;
        tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
        Ok(())
    }

    #[tokio::test]
    async fn test_sink_stream() -> anyhow::Result<()> {
        // let _ = env_logger::Builder::from_env(env_logger::Env::new().default_filter_or("trace")).try_init();

        let state = EchoState { counter: 0 };

        let echo_ref = ActorRef::new("echo", crate::echo::EchoActor {}, state, 100000).await?;
        let (echo_sink, echo_stream) = ActorSink::sink_stream(echo_ref.clone());
        let message_stream = futures::stream::iter(vec![
            EchoMessage::Ping,
            EchoMessage::Ping,
            EchoMessage::Ping,
        ])
        .map(Ok);
        _ = message_stream.forward(echo_sink).await;
        echo_stream
            .for_each(|message| async move {
                println!("Got {:?}", message.unwrap());
            })
            .await;
        tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
        Ok(())
    }

    #[derive(Debug, Copy, Clone)]
    enum EventElement {
        Fire,
        Water,
    }

    #[tokio::test]
    async fn check_subscription() -> anyhow::Result<()> {
        use crate::broadcast::*;
        let event_bus: Arc<EventBus<EventElement>> = Arc::new(EventBus::new());
        let subscriber_id = event_bus
            .subscribe(|event| async move {
                let otp = match event {
                    EventElement::Fire => "FIIIRE!",
                    EventElement::Water => "WAAATER!",
                };
                println!("{}", otp);
            })
            .await;

        tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
        event_bus.publish(EventElement::Water).await?;
        event_bus.publish(EventElement::Fire).await?;
        event_bus.publish(EventElement::Water).await?;
        tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
        _ = event_bus.unsubscribe(subscriber_id);

        let _subscriber_id = event_bus
            .subscribe(|event| async move {
                let otp = match event {
                    EventElement::Fire => "Y",
                    EventElement::Water => "N",
                };
                println!("{}", otp);
            })
            .await;

        tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
        event_bus.publish(EventElement::Fire).await?;
        event_bus.publish(EventElement::Water).await?; // FIXME: subscribers will see only last publish.
        let _subscriber_id = event_bus
            .subscribe(|event| async move {
                let otp = match event {
                    EventElement::Fire => "YY",
                    EventElement::Water => "NN",
                };
                println!("{}", otp);
            })
            .await;
        tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;

        event_bus.publish(EventElement::Fire).await?;
        event_bus.publish(EventElement::Water).await?; // FIXME: subscribers will see only last publish.
        tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
        Ok(())
    }

    #[tokio::test]
    async fn actor_event_bus_test() {
        use crate::broadcast::*;
        let event_bus: Arc<EventBus<EventElement>> = Arc::new(EventBus::new());
        let state = EchoState { counter: 0 };
        let echo_ref = Arc::new(
            ActorRef::new("echo", crate::echo::EchoActor {}, state, 100000)
                .await
                .unwrap(),
        );

        let e = echo_ref.clone();

        let subscriber_id = event_bus
            .subscribe(move |event: EventElement| {
                let e = e.clone();
                async move {
                    match event {
                        EventElement::Fire => {
                            e.send(EchoMessage::Ping).await.unwrap();
                            ()
                        }
                        _ => (),
                    }
                }
            })
            .await;
        tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;

        event_bus.publish(EventElement::Fire).await.unwrap();
        event_bus.publish(EventElement::Fire).await.unwrap();
        event_bus.publish(EventElement::Water).await.unwrap();
        event_bus.publish(EventElement::Fire).await.unwrap();
        tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
        event_bus.unsubscribe(subscriber_id).await;

        tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;

        let state =
            if let Ok(EchoResponse::Pong { counter }) = echo_ref.ask(EchoMessage::Ping).await {
                counter
            } else {
                0
            };

        assert_eq!(state, 4)
    }

    #[tokio::test]
    async fn create_load_balancer() {
        // env_logger::init();
        let _ = env_logger::Builder::from_env(env_logger::Env::new().default_filter_or("trace")).try_init();

        let lb: Arc<LoadBalancer<EchoActor, EchoMessage, EchoState, EchoResponse, EchoError>> =
            LoadBalancer::new("user_load_balancer", 10, |id: usize| {
                Box::pin(async move {
                    let user: Arc<
                        ActorRef<EchoActor, EchoMessage, EchoState, EchoResponse, EchoError>,
                    > = ActorRef::new(
                        format!("echo-{}", id),
                        EchoActor {},
                        EchoState { counter: 0 },
                        10000,
                    )
                    .await?;
                    Ok(user)
                })
            })
            .await
            .unwrap();

        for _ in 0..30 {
            lb.send(EchoMessage::Ping).await.unwrap();
        }
        tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;

        let mut res = 0;
        for id in 0..10 {
            let r = lb.state(id).await.unwrap().lock().await.counter;
            res += r;
        }

        println!("{}", res);
        assert_eq!(res, 30);
    }
}
